-
Notifications
You must be signed in to change notification settings - Fork 357
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
multi chain rewind service #2673
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good. It would be good to have a test/simulation of the multichain rewind
@@ -158,6 +173,10 @@ export class StoreService { | |||
this.subqueryProject.network.chainId | |||
); | |||
|
|||
if (this.config.historical === 'timestamp') { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the assumption can be made that timestamp means multichain. Its possible to have timestamp historical without multichain
private async getRewindTimestamp(): Promise<number | undefined> { | ||
const rewindTimestampKey = generateRewindTimestampKey(this.subqueryProject.network.chainId); | ||
const record = await this.globalDataRepo.findByPk(rewindTimestampKey); | ||
if (hasValue(record)) { | ||
return record.toJSON().value as number; | ||
} | ||
} | ||
|
||
private async initChainRewindTimestamp() { | ||
if (this.historical !== 'timestamp') return; | ||
if ((await this.getRewindTimestamp()) !== undefined) return; | ||
const rewindTimestampKey = generateRewindTimestampKey(this.subqueryProject.network.chainId); | ||
await this.globalDataRepo.create({key: rewindTimestampKey, value: 0}); | ||
} | ||
|
||
async getLastProcessedBlock(): Promise<{height: number; timestamp?: number}> { | ||
const {lastProcessedBlockTimestamp: timestamp, lastProcessedHeight: height} = await this.metadataModel.findMany([ | ||
'lastProcessedHeight', | ||
'lastProcessedBlockTimestamp', | ||
]); | ||
|
||
return {height: height || 0, timestamp}; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be moved to a GlobalPlainModel
|
||
// Creating a new pgClient is to avoid using the same database connection as the block scheduler, | ||
// which may prevent real-time listening to rollback events. | ||
const pgPool = new Pool(getPgPoolConfig(this.nodeConfig)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should use the existing pool instead of creating a new one. The existing one can be exposed if necessary.
@@ -58,6 +58,7 @@ export interface IBlockchainService< | |||
// Unfinalized blocks | |||
getHeaderForHash(hash: string): Promise<Header>; | |||
getHeaderForHeight(height: number): Promise<Header>; | |||
getRequiredHeaderForHeight(height: number): Promise<Required<Header>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this? getHeaderForHeight
should always return a timestamp but there is one substrate chain that doesn't have timestamps in blocks so that is why it is optional
// If we're rewinding, we should wait until it's done | ||
const multiChainStatus = this.multiChainRewindService.getStatus(); | ||
if (RewindStatus.Normal !== multiChainStatus) { | ||
logger.info(`Wait for all chains to complete rewind, current chainId: ${this.multiChainRewindService.chainId}`); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.info(`Wait for all chains to complete rewind, current chainId: ${this.multiChainRewindService.chainId}`); | |
logger.info(`Waiting for all chains to complete rewind, current chainId: ${this.multiChainRewindService.chainId}`); |
const pgPool = new Pool(getPgPoolConfig(this.nodeConfig)); | ||
this.pgListener = await pgPool.connect(); | ||
|
||
this.pgListener.on('notification', (msg) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing to check here is that if the connection(s) are dropped whether the notification is reinstated once it connects
this.waitRewindHeader = undefined; | ||
break; | ||
default: | ||
throw new Error(`Unknown rewind event: ${eventType}`); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will run into other issues. We have other triggers (subscriptions)
if (rewindLock) { | ||
this.status = RewindStatus.WaitOtherChain; | ||
} | ||
if (rewindTimestamp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be if else
?
|
||
const tx = await this.sequelize.transaction(); | ||
try { | ||
const [_, updateRows] = await this.sequelize.query( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these global table db operations can be moved into a PlainGlobalModel. It would be good to have all db operations in a single location
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this query string be generalised between metadata?
poiService?: PoiService, | ||
forceCleanService?: ForceCleanService | ||
): Promise<void> { | ||
const lastUnit = storeService.historical === 'timestamp' ? lastProcessed.timestamp : lastProcessed.height; | ||
const isMultiChain = storeService.historical === 'timestamp'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as the other comment, historical mode doesn't indicate that its multichain
Description
In a multi-chain project, after enabling historical mode, a rewind requires coordination with other processes for rollback. Below are the tasks we need to accomplish.
Fixes # (issue)
#2620
TODO
_global.rewind_lock
exist. If they do, perform a rollback.rewind_timestamp
exists. If it does, perform a rewind._global
contain therewind_timestamp
. If none exist, release_global.rewind_lock
.timestamp
, It may require using the binary search method.Type of change
Please delete options that are not relevant.
Checklist